package com.bodystm.server;

import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.ArrayUtils;
import org.apache.log4j.Logger;
import org.aspectj.bridge.MessageWriter;

import com.bodystm.algorithm.CNIBP_PWV;
import com.bodystm.algorithm.DllInterface;
import com.bodystm.algorithm.LowPerf;
import com.bodystm.algorithm.MathTools;
import com.bodystm.algorithm.MedfilterBO;
import com.bodystm.bean.Bed;
import com.bodystm.bean.BloodPressure;
import com.bodystm.bean.Ecg;
import com.bodystm.bean.datadeal.BloodPressureAnalysis;
import com.bodystm.bean.datadeal.OximetryAnalysis;
import com.bodystm.config.PublicSetting;
import com.bodystm.util.MathUtils;
import com.bodystm.web.DeviceSettings;

import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import redis.clients.jedis.Jedis;
/**
 * 消费者使用ScheduleMQ接收数据
 * ScheduleMQ mq = new ScheduleMQ();  //也就是consumer
       mq.start();
 * @author ehl
 *
 */
public class ScheduleMQ4PBP extends Thread {
	Logger logger=Logger.getLogger(ScheduleMQ4PBP.class);
	private byte[] mac;
	private String strMac;
	private byte[] rediskey;
	private BloodPressure bloodPressure=null;
	BloodPressureAnalysis bpAnalysis=new BloodPressureAnalysis();
//	private MathTools mathTools=new MathTools();
//	private MedfilterBO medfilterBO=new MedfilterBO();//中值滤波
	private LowPerf lowPerf=new LowPerf();
	private CNIBP_PWV cNIBP_PWV=new CNIBP_PWV();
	//===============算法改为调用dll的形式=====================
	private long mathTools=DllInterface.INSTANCE.Resampling_C();
	private long medfilterBO=DllInterface.INSTANCE.MedFilterBO_C();
	private long lowPerf_dll=DllInterface.INSTANCE.butter_C();//低通滤波
	
	//===================================================
	private Bed bed=null;
	public ScheduleMQ4PBP(byte[] data){
		this.mac=ArrayUtils.subarray(data,  0, 6);
		this.strMac=ByteUtil.convertByteToString(mac);
		this.rediskey=data;
	}
	@Override
	public void run(){
		//==========================
		//建立websocket服务，等待客户端连接
		
		//==========================
		Jedis jedis=null;
		bed = DeviceSettings.hasBeds.get(strMac);
    	if (null!=bed&&!bed.getStatus().equals(PublicSetting.BED_STATUS_EMPTY)) {
    		bloodPressure=bed.getBloodPressure();
		}
		/*for (Bed bed : DeviceSettings.lstBeds) {
			if (!bed.getStatus().equals(PublicSetting.BED_STATUS_EMPTY)&&bed.getConcentratorNo().equals(strMac)) {
				bloodPressure=bed.getBloodPressure();
				break;
			}
		}*/
		try {
			jedis = RedisManager.getJedis();  
			//组装发送到前台的数据
			StringBuilder sbResultData=null;
			while(!isInterrupted()) {
			    if (jedis==null) {
			    	logger.error("redis服务连接失败！");
					break;
				}
			    bloodPressure=null;//为了防止后台修改床位mac时，哈希表中bed变为新的，这里还在更新旧的bed对象
			    if (bloodPressure==null) {
			    	bed = DeviceSettings.hasBeds.get(strMac);
			    	if (null!=bed&&!bed.getStatus().equals(PublicSetting.BED_STATUS_EMPTY)) {
			    		bloodPressure=bed.getBloodPressure();
			    		if (bloodPressure==null) {
			    			bloodPressure=new BloodPressure();
			    			bed.setBloodPressure(bloodPressure);
						}
					}
				}
			    //阻塞式brpop，List中无数据时阻塞  
			    //参数0表示一直阻塞下去，直到List出现数据  
			    List<byte[]> list = jedis.brpop(0, rediskey);  
			    
			    Channel channel =null;
			    ArrayList<Channel> channels=null;
//			    WebSocketManager.wsServer.sendMessage(strMac, strMac);
			    if (WebSocketManager.wsServer!=null) {
//			    	channel = WebSocketManager.wsServer.channelsMap4PBP.get(strMac);
			    	channels = WebSocketManager.wsServer.channelsMap4PBP2.get(strMac);
				}
//		        if (channel!=null&&channel.isOpen()) {
		        	for(byte[] bs : list) {  
		        		//忽略返回的列表中的key值
		        		if (bs.length<10) {
							continue;
						}
		        		sbResultData=new StringBuilder();
		        		sbResultData.append("/"+strMac+"/nibp_pwv/");
		        		for(int i=0;i<BloodPressureAnalysis.BP_POINTS_PER_PACK;i++){
		        			//通道1 nNIBP_PWV_I
		        			double nNIBP_PWV_I=Double.parseDouble(MathUtils.binary(ArrayUtils.subarray(bs, 8+4*i, 10+4*i), 10));
		        			//buffer算法
//		        			double tmp=lowPerf.butter(nNIBP_PWV_I,2);
		        			double tmp=DllInterface.INSTANCE.butter_B(nNIBP_PWV_I,2, lowPerf_dll);
//		        			tmp = medfilterBO.MedFilterBO(tmp, 2);
		        			tmp=DllInterface.INSTANCE.MedFilterBO_B(tmp, 2, medfilterBO);
		        			//统计极值用来绘制波形
		        			bpAnalysis.extremeStastics(tmp);
		        			cNIBP_PWV.m_fNIBP_PWV_1[cNIBP_PWV.curNum]=(float) tmp;
		        			//通道1 nNIBP_PWV_II
		        			double nNIBP_PWV_II=Double.parseDouble(MathUtils.binary(ArrayUtils.subarray(bs, 10+4*i, 12+4*i), 10));
//		        			double tmp2 = lowPerf.butter((double)nNIBP_PWV_II, 3);
		        			double tmp2=DllInterface.INSTANCE.butter_B(nNIBP_PWV_II,3, lowPerf_dll);
//		        			tmp2 = medfilterBO.MedFilterBO(tmp2, 3);
		        			tmp2 = DllInterface.INSTANCE.MedFilterBO_B(tmp2, 3, medfilterBO);
		        			cNIBP_PWV.m_fNIBP_PWV_2[cNIBP_PWV.curNum]=(float) tmp2;
		        			cNIBP_PWV.curNum++;
		        			if (cNIBP_PWV.curNum==CNIBP_PWV.NIBP_PWV_CalcBufferLen) {
		        				//脉搏波波速
		        				float fTmpPWV_Val = cNIBP_PWV.Calculate_delta_T(cNIBP_PWV.m_fNIBP_PWV_1, cNIBP_PWV.m_fNIBP_PWV_2, CNIBP_PWV.NIBP_PWV_CalcBufferLen);
//		        				logger.info("脉搏波波速:"+fTmpPWV_Val);
		        				cNIBP_PWV.curNum=0;
		        				/*if (channel!=null&&channel.isOpen()) {	        					
		        					channel.writeAndFlush(new TextWebSocketFrame("/"+strMac+"/nibp_pwv/"+fTmpPWV_Val));
		        				}*/	
		        				/*if (bed!=null) {//已经在dataGather里添加过了
		        					bed.setBloUpdateMs(System.currentTimeMillis());
								}*/
							}
		        			//重采样相关
		        			if (channels!=null&&channels.size()>0) {//channel!=null&&channel.isOpen()
//		        				double dResampleRet[]=new double[8];
//		        				int nResamplingRet=mathTools.Resampling((float)1.0/OximetryAnalysis.SpO2InitialSample, (float)1.0/(bpAnalysis.npTargetSampleRate), dResampleRet, tmp, 0);//0-->BodystmProduct.BodystmSpO2.ordinal()
		        				int nResamplingRet=DllInterface.INSTANCE.Resampling_B((float)1.0/BloodPressureAnalysis.sampleRate, (float)1.0/(BloodPressureAnalysis.npTargetSampleRate), tmp, 0, mathTools);
		        				for (int j = 0; j <= nResamplingRet; j++)
		        				{
//		        					channel.writeAndFlush(new TextWebSocketFrame("/"+strMac+"/nibp_pwv/"+tmp));
//		        					sbResultData.append(bpAnalysis.maxNum+bpAnalysis.minNum-tmp);sbResultData.append(",");
		        					sbResultData.append(tmp);sbResultData.append(",");
		        				}
		        				
		        			}
		        		}
		        		/*if (channel!=null&&channel.isOpen()) {
				        	sbResultData.deleteCharAt(sbResultData.length()-1);
				        	sbResultData.append("/");
				        	sbResultData.append(bpAnalysis.maxNum);
				        	sbResultData.append("/");
				        	sbResultData.append(bpAnalysis.minNum);
				        	channel.writeAndFlush(new TextWebSocketFrame(sbResultData.toString()));
				        }*/
		        		if (channels!=null&&channels.size()>0) {
		        			sbResultData.deleteCharAt(sbResultData.length()-1);
				        	sbResultData.append("/");
				        	sbResultData.append(bpAnalysis.maxNum);
				        	sbResultData.append("/");
				        	sbResultData.append(bpAnalysis.minNum);
				        	String msg = sbResultData.toString();
							for(Channel ch:channels){
								if (ch!=null&&ch.isOpen()) {
									ch.writeAndFlush(new TextWebSocketFrame(msg));
							    }
							}
						}
				    } 
		        	//channel.flush();
//				}
			     
  
			}
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			// Thread.sleep()方法由于中断抛出异常。  
            // Java虚拟机会先将该线程的中断标识位清除，然后抛出InterruptedException，  
            // 因为在发生InterruptedException异常的时候，会清除中断标记  
            // 如果不加处理，那么下一次循环开始的时候，就无法捕获这个异常。  
            // 故在异常处理中，再次设置中断标记位  
            Thread.currentThread().interrupt();
		} finally {
			//jedis.close();  
			RedisManager.close(jedis);
			DllInterface.INSTANCE.Resampling_D(mathTools);
			DllInterface.INSTANCE.MedFilterBO_D(medfilterBO);
			DllInterface.INSTANCE.butter_D(lowPerf_dll);
		}
	}
}
